Skip to content

Conversation

@muhamadazmy
Copy link
Contributor

@muhamadazmy muhamadazmy commented Nov 17, 2025

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for replacing the direct Bifrost write with the IngressClient in the Shuffle @muhamadazmy. Maybe the name IngressClient does not fit 100% given that now also the Shuffle uses it. Maybe something like IngestionClient or so works better. Given that we don't use the send window of the IngressClient yet, I wouldn't expect a different runtime behavior of the shuffle. Once we have this, I would be interested in how the overall shuffle throughput increases by using the IngressClient.

I left a few minor comments for your consideration.

Comment on lines 228 to 233
ingress
.ingest(
msg.partition_key(),
IngestRecord::from_parts(msg.record_keys(), msg),
)
.await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a follow-up to this PR we should make use of being able to send more than a single record at a time to maximize our throughput. I guess this will require a overhaul of the Shuffle component. I think quite a few things can be simplified here (no more pin projecting if we don't require shuffle_next_message to run in the select arm, etc.).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I totally agree.

networking.clone(),
Metadata::with_current(|m| m.updateable_partition_table()),
partition_routing.clone(),
NonZeroUsize::new(5 * 1024 * 1024).unwrap(), // 5MB
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to best size the buffer window to fully utilize the network connections between the partition processors? Should this be something along the lines of RTT * bandwidth * #nodes * 2 to be able to keep all connections fully utilized?

Probably also a good idea to make this configurable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the buffer for the inflight records which is shared across partition sessions. It eventually should be different from the partition session chunk size (work in progress), which is the one that has to respect max network request size

@tillrohrmann
Copy link
Contributor

There seem to be a few test failures on GHA.

Comment on lines 228 to 233
ingress
.ingest(
msg.partition_key(),
IngestRecord::from_parts(msg.record_keys(), msg),
)
.await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about support for rolling upgrades?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ingest does not fail unless the ingession client is closed. This means worst case is that it will block until leaders are responsive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think to avoid this situation it's possible we release support for the Ingest messages in PP first before actually using them in the following release.

@muhamadazmy muhamadazmy force-pushed the pr4024 branch 15 times, most recently from d6e9955 to 8c0797e Compare December 2, 2025 09:00
@muhamadazmy muhamadazmy force-pushed the pr4024 branch 4 times, most recently from d14fb10 to e1b4c8e Compare December 2, 2025 14:50
@muhamadazmy muhamadazmy changed the title Use ingress-client in the Shuffler Use ingestion-client in the Shuffler Dec 2, 2025
@muhamadazmy muhamadazmy marked this pull request as draft December 3, 2025 10:59
@muhamadazmy muhamadazmy force-pushed the pr4024 branch 8 times, most recently from f03e3c2 to 078d17d Compare December 4, 2025 10:43
@muhamadazmy muhamadazmy marked this pull request as ready for review December 4, 2025 10:44
- `ingestion-client` implements the runtime layer that receives WAL envelopes, fans it out to the correct partition, and tracks completion. It exposes:
  - `IngestionClient`, enforces inflight budgets, and resolves partition IDs before sending work downstream.
  - The session subsystem that batches `IngestRecords`, retries connections, and reports commit status to callers.
- `ingestion-client` only ingests records and notify the caller once the record is "committed" to bifrost by the PP. This makes it useful to implement kafka ingress and other external ingestion
Summary:
Handle the incoming `IngestRequest` messages sent by the `ingestion-client`
Summary:
Refactor ingress-kafka to leverage on `ingestion-client` implementation. This replaces
the previous direct write to bifrost which allows:
- Batching, which increases throughput
- PP becomes the sole writer of its logs (WIP restatedev#3965)
- Use IngestionClient instead of bifrost to write to partitions logs
- Remove deprecated `delete_invocation`
Summary:
This PR makes sure cleaner does not do an external bifrost write
by using creating a cleaner effect stream that can be handled
directly by the PP event loop
Avoid direct writes to bifrost in shuffler by using a
dedicated ingestion-client instance.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants